package com.xj.kafka.thread;

import com.xj.kafka.consumer.IMessageDispose;
import kafka.consumer.KafkaStream;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

import java.nio.ByteBuffer;

/**
 * 处理消息线程
 * User: bjxiajun
 * Date: 13-10-18
 * Time: 下午12:57
 */
public class MessageDisposeThread implements Runnable {
    private KafkaStream<Message> stream;
    private IMessageDispose messageDispose;

    public MessageDisposeThread(KafkaStream<Message> stream,IMessageDispose messageDispose) {
        this.stream = stream;
        this.messageDispose=messageDispose;
    }

    @Override
    public void run() {
        for (MessageAndMetadata<Message> msgAndMetadata : stream) {
            ByteBuffer buffer = ((Message) msgAndMetadata.message()).payload();
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            buffer.clear();
            this.messageDispose.dispose(bytes);
        }
    }
}
